{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Kinesis Data Analytics\n",
    "\n",
    "![](img/kinesis_data_analytics_docs.png)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "import sagemaker\n",
    "import pandas as pd\n",
    "\n",
    "sess   = sagemaker.Session()\n",
    "bucket = sess.default_bucket()\n",
    "role = sagemaker.get_execution_role()\n",
    "region = boto3.Session().region_name\n",
    "\n",
    "kinesis = boto3.Session().client(service_name='kinesis', region_name=region)\n",
    "analytics = boto3.client('kinesisanalyticsv2')\n",
    "sts = boto3.Session().client(service_name='sts', region_name=region)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Download Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!aws s3 cp 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Software_v1_00.tsv.gz' ./data/"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import csv\n",
    "import pandas as pd\n",
    "\n",
    "df = pd.read_csv('./data/amazon_reviews_us_Digital_Software_v1_00.tsv.gz', \n",
    "                 delimiter='\\t', \n",
    "                 quoting=csv.QUOTE_NONE,\n",
    "                 compression='gzip')\n",
    "df.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.head(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "partition_key = '123'\n",
    "#reviews_tsv = '5\\tThis is a 5 star review\\n1\\tThis is a 1 star review\\n'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_star_rating_and_review_body = df[['star_rating', 'review_body']][:100]\n",
    "df_star_rating_and_review_body.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_star_rating_and_review_body.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reviews_tsv = df_star_rating_and_review_body.to_csv(sep='\\t',\n",
    "                                                    header=None,\n",
    "                                                    index=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "reviews_tsv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r iam_role_kinesis_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "arn:aws:iam::979770387749:role/DSOAWS_Kinesis\n"
     ]
    }
   ],
   "source": [
    "print(iam_role_kinesis_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r data_stream_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "arn:aws:kinesis:ap-southeast-2:979770387749:stream/dsoaws-data-stream\n"
     ]
    }
   ],
   "source": [
    "print(data_stream_arn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "%store -r firehouse_delivery_stream_arn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "arn:aws:firehose:ap-southeast-2:979770387749:deliverystream/dsoaws-firehose-stream\n"
     ]
    }
   ],
   "source": [
    "print(firehouse_delivery_stream_arn)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TODO:  Data Analytics\n",
    "https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesisanalyticsv2.html#KinesisAnalyticsV2.Client.create_application"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "ename": "ClientError",
     "evalue": "An error occurred (ValidationException) when calling the CreateApplication operation: 3 validation errors detected: Value 'DSOAWS_Kinesis_Data_Analytics_Application_Output' at 'applicationConfiguration.sqlApplicationConfiguration.outputs.1.member.name' failed to satisfy constraint: Member must have length less than or equal to 32; Value 'string' at 'applicationConfiguration.sqlApplicationConfiguration.inputs.1.member.inputProcessingConfiguration.inputLambdaProcessor.resourceARN' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:.*; Value 'DSOAWS_Kinesis_Data_Analytics_Application_Output' at 'applicationConfiguration.sqlApplicationConfiguration.inputs.1.member.namePrefix' failed to satisfy constraint: Member must have length less than or equal to 32",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mClientError\u001b[0m                               Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-27-a222aa3facf4>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m    101\u001b[0m         },\n\u001b[1;32m    102\u001b[0m         'ApplicationCodeConfiguration' : {\n\u001b[0;32m--> 103\u001b[0;31m             \u001b[0;34m'CodeContentType'\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m'PLAINTEXT'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    104\u001b[0m         }\n\u001b[1;32m    105\u001b[0m     }\n",
      "\u001b[0;32m~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py\u001b[0m in \u001b[0;36m_api_call\u001b[0;34m(self, *args, **kwargs)\u001b[0m\n\u001b[1;32m    314\u001b[0m                     \"%s() only accepts keyword arguments.\" % py_operation_name)\n\u001b[1;32m    315\u001b[0m             \u001b[0;31m# The \"self\" in this scope is referring to the BaseClient.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 316\u001b[0;31m             \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_make_api_call\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0moperation_name\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    317\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    318\u001b[0m         \u001b[0m_api_call\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__name__\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mstr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpy_operation_name\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m~/anaconda3/envs/python3/lib/python3.6/site-packages/botocore/client.py\u001b[0m in \u001b[0;36m_make_api_call\u001b[0;34m(self, operation_name, api_params)\u001b[0m\n\u001b[1;32m    624\u001b[0m             \u001b[0merror_code\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mparsed_response\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Error\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m{\u001b[0m\u001b[0;34m}\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"Code\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    625\u001b[0m             \u001b[0merror_class\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexceptions\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfrom_code\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0merror_code\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 626\u001b[0;31m             \u001b[0;32mraise\u001b[0m \u001b[0merror_class\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mparsed_response\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperation_name\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    627\u001b[0m         \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    628\u001b[0m             \u001b[0;32mreturn\u001b[0m \u001b[0mparsed_response\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mClientError\u001b[0m: An error occurred (ValidationException) when calling the CreateApplication operation: 3 validation errors detected: Value 'DSOAWS_Kinesis_Data_Analytics_Application_Output' at 'applicationConfiguration.sqlApplicationConfiguration.outputs.1.member.name' failed to satisfy constraint: Member must have length less than or equal to 32; Value 'string' at 'applicationConfiguration.sqlApplicationConfiguration.inputs.1.member.inputProcessingConfiguration.inputLambdaProcessor.resourceARN' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:.*; Value 'DSOAWS_Kinesis_Data_Analytics_Application_Output' at 'applicationConfiguration.sqlApplicationConfiguration.inputs.1.member.namePrefix' failed to satisfy constraint: Member must have length less than or equal to 32"
     ]
    }
   ],
   "source": [
    "response = analytics.create_application(\n",
    "    ApplicationName='DSOAWS_Kinesis_Data_Analytics_Application',\n",
    "#    ApplicationDescription='string',\n",
    "    RuntimeEnvironment='SQL-1_0',\n",
    "#    RuntimeEnvironment='SQL-1_0'|'FLINK-1_6'|'FLINK-1_8',\n",
    "    ServiceExecutionRole=iam_role_kinesis_arn,\n",
    "    ApplicationConfiguration={\n",
    "        'SqlApplicationConfiguration': {\n",
    "            'Inputs': [\n",
    "                {\n",
    "                    'NamePrefix': 'DSOAWS_Kinesis_Data_Analytics_Application_Output',\n",
    "                    'InputProcessingConfiguration': {\n",
    "                        'InputLambdaProcessor': {\n",
    "                            'ResourceARN': 'string'\n",
    "                        }\n",
    "                    },\n",
    "                    'KinesisStreamsInput': {\n",
    "                        'ResourceARN': data_stream_arn\n",
    "                    },\n",
    "#                    'KinesisFirehoseInput': {\n",
    "#                        'ResourceARN': 'string'\n",
    "#                    },\n",
    "                    'InputParallelism': {\n",
    "                        'Count': 1\n",
    "                    },\n",
    "                    'InputSchema': {\n",
    "                        'RecordFormat': {\n",
    "                            'RecordFormatType': 'CSV',\n",
    "                            'MappingParameters': {\n",
    "#                                 'JSONMappingParameters': {\n",
    "#                                     'RecordRowPath': 'string'\n",
    "#                                 },\n",
    "                                'CSVMappingParameters': {\n",
    "                                    'RecordRowDelimiter': '\\n',\n",
    "                                    'RecordColumnDelimiter': '\\t'\n",
    "                                }\n",
    "                            }\n",
    "                        },\n",
    "                        'RecordEncoding': 'UTF-8',\n",
    "                        'RecordColumns': [\n",
    "                            {\n",
    "                                'Name': 'star_rating',\n",
    "                                'SqlType': 'VARCHAR'\n",
    "                            },\n",
    "                            {\n",
    "                                'Name': 'review_body',\n",
    "                                'SqlType': 'VARCHAR'\n",
    "                            },                            \n",
    "                        ]\n",
    "                    }\n",
    "                },\n",
    "            ],\n",
    "            'Outputs': [\n",
    "                {\n",
    "                    'Name': 'DSOAWS_Kinesis_Data_Analytics_Application_Output',\n",
    "#                     'KinesisStreamsOutput': {\n",
    "#                         'ResourceARN': 'string'\n",
    "#                     },\n",
    "                    'KinesisFirehoseOutput': {\n",
    "                        'ResourceARN': firehouse_delivery_stream_arn\n",
    "                    },\n",
    "#                     'LambdaOutput': {\n",
    "#                         'ResourceARN': 'string'\n",
    "#                     },\n",
    "                    'DestinationSchema': {\n",
    "                        'RecordFormatType': 'CSV'\n",
    "                    }\n",
    "                },\n",
    "            ],\n",
    "#             'ReferenceDataSources': [\n",
    "#                 {\n",
    "#                     'TableName': 'string',\n",
    "#                     'S3ReferenceDataSource': {\n",
    "#                         'BucketARN': 'string',\n",
    "#                         'FileKey': 'string'\n",
    "#                     },\n",
    "#                     'ReferenceSchema': {\n",
    "#                         'RecordFormat': {\n",
    "#                             'RecordFormatType': 'JSON'|'CSV',\n",
    "#                             'MappingParameters': {\n",
    "#                                 'JSONMappingParameters': {\n",
    "#                                     'RecordRowPath': 'string'\n",
    "#                                 },\n",
    "#                                 'CSVMappingParameters': {\n",
    "#                                     'RecordRowDelimiter': 'string',\n",
    "#                                     'RecordColumnDelimiter': 'string'\n",
    "#                                 }\n",
    "#                             }\n",
    "#                         },\n",
    "#                         'RecordEncoding': 'string',\n",
    "#                         'RecordColumns': [\n",
    "#                             {\n",
    "#                                 'Name': 'string',\n",
    "#                                 'Mapping': 'string',\n",
    "#                                 'SqlType': 'string'\n",
    "#                             },\n",
    "#                         ]\n",
    "#                     }\n",
    "#                 },\n",
    "#             ]            \n",
    "        },\n",
    "        'ApplicationCodeConfiguration' : {\n",
    "            'CodeContentType': 'PLAINTEXT'\n",
    "        }\n",
    "    }\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
